// Copyright (c) 2020-present, INSPUR Co, Ltd. All rights reserved.
// This source code is licensed under Apache 2.0 License.

#pragma once
#include <assert.h>
#include <stdlib.h>

#include <algorithm>
#include <atomic>
#include <iostream>

#include "pure_mem/epoche.h"
#include "pure_mem/key_index/abstract_uk_index.h"
#include "pure_mem/key_index/userkey_index.h"
#include "pure_mem/mvcc_key.h"
#include "pure_mem/version_node.h"
#include "rocksdb/slice.h"
#include "util/allocator.h"
#include "util/coding.h"
#include "util/random.h"

namespace rocksdb {

/**
 * memtable中索引接口封装，与 inlineskiplist 等具有相同的接口
 * InlineUserKeyIndex　在初始化时，增加　UserKeyIndexType传入，用于要求使用的索引算法
 *
 */
template <class Comparator> class InlineUserKeyIndex {
public:
  using DecodedKey =
      typename std::remove_reference<Comparator>::type::DecodedType;

  std::mutex mtx_;

  std::unordered_map<std::string, TreeVoidRef*> hashMap_;  ///不用string

  // Create a new InlineUserKeyIndex object that will use "cmp" for comparing
  // keys, and will allocate memory using "*allocator".  Objects allocated
  // in the allocator must remain allocated for the lifetime of the
  // skiplist object.
  explicit InlineUserKeyIndex(const Comparator cmp, UserKeyIndexType type);

  ~InlineUserKeyIndex() {
    delete userKeyIndex_;
  }

  // Allocates a key and a skip-list node, returning a pointer to the key
  // portion of the node.  This method is thread-safe if the allocator
  // is thread-safe.
  char *AllocateKey(size_t key_size);

  // Inserts a key allocated by AllocateKey, after the actual key value
  // has been filled in.
  //
  // REQUIRES: nothing that compares equal to key is currently in the list.
  // REQUIRES: no concurrent calls to any of inserts.
  bool Insert(const char *key);

  // Inserts a key allocated by AllocateKey with a hint of last insert
  // position in the skip-list. If hint points to nullptr, a new hint will be
  // populated, which can be used in subsequent calls.
  //
  // It can be used to optimize the workload where there are multiple groups
  // of keys, and each key is likely to insert to a location close to the last
  // inserted key in the same group. One example is sequential inserts.
  //
  // REQUIRES: nothing that compares equal to key is currently in the list.
  // REQUIRES: no concurrent calls to any of inserts.
  bool InsertWithHint(const char *key, void **hint);

  // Like Insert, but external synchronization is not required.
  bool InsertConcurrently(const char *key);

  // Returns true iff an entry that compares equal to key is in the list.
  bool Contains(const char *key) const;

  // Return estimated number of entries smaller than `key`.
  uint64_t EstimateCount(const char *key) const { return 0; };

  // Iteration over the contents of a skip list
  class Iterator {
  public:
    // Initialize an iterator over the specified list.
    // The returned iterator is not valid.
    explicit Iterator(const InlineUserKeyIndex *list);

    // Change the underlying skiplist used for this iterator
    // This enables us not changing the iterator without deallocating
    // an old one and then allocating a new one
    void SetList(const InlineUserKeyIndex *list);

    // Returns true iff the iterator is positioned at a valid node.
    bool Valid() const;

    // Returns the key at the current position.
    // REQUIRES: Valid()
    const char *key() const;

    VersionNode *node();

    TreeVoidRef *userKeyRef() {
      assert(Valid());
      return artRef_;
    }

    // Advances to the next position.
    // REQUIRES: Valid()
    void Next();

    // Advances to the previous position.
    // REQUIRES: Valid()
    void Prev();

    // Advance to the first entry with a key >= target
    void Seek(const char *target);

    void Seek(const Slice& key);

    void HashSeek(const char* target);
    void HashSeek(const Slice& key);

    void DelCurNode();

    void *PushCurNode();

    // Retreat to the last entry with a key <= target
    void SeekForPrev(const char *target);
    void HashSeekForPrev(const char* target);
    void HashSeekForPrev(const Slice& target);
    void HashSeekForPrevOrEqual(const Slice& target);

    // Position at the first entry in list.
    // Final state of iterator is Valid() iff list is not empty.
    void SeekToFirst();

    // Position at the last entry in list.
    // Final state of iterator is Valid() iff list is not empty.
    void SeekToLast();

  private:
    const InlineUserKeyIndex *list_;
    VersionNode *node_;
    TreeVoidRef *artRef_;
  };

  InlineUserKeyIndex::Iterator *GetIterator() {
    return new InlineUserKeyIndex::Iterator(this);
  }

private:
  // Allocator* const allocator_;  // Allocator used for allocations of nodes
  // Immutable after construction
  Comparator const compare_;

  // VersionNode* const head_;
  IUserKeyIndex *userKeyIndex_;

  VersionNode *AllocateNode(size_t key_size, int height);

  bool Equal(const char *a, const char *b) const {
    return (compare_(a, b) == 0);
  }

  bool Equal(const char *a, const Slice &b) const {
    return (compare_(a, b) == 0);
  }

  bool LessThan(const char *a, const char *b) const {
    return (compare_(a, b) < 0);
  }

  bool LessThan(const char *a, const Slice& b) const {
    return (compare_(a, b) < 0);
  }

  // Return true if key is greater than the data stored in "n".  Null n
  // is considered infinite.  n should not be head_.
  bool KeyIsAfterNode(const char *key, VersionNode *n) const;
  bool KeyIsAfterNode(const DecodedKey &key, VersionNode *n) const;

  // Returns the earliest node with a key >= key.
  // Return nullptr if there is no such node.
  VersionNode *FindGreaterOrEqual(const char *key) const;
  VersionNode *FindGreaterOrEqualRef(const char *key,
                                     TreeVoidRef *&curRef) const;
  VersionNode* HashFindGreaterOrEqualRef(const char *key, TreeVoidRef* &curRef) const;

  VersionNode *FindGreaterOrEqualRef(const Slice& key,
                                     TreeVoidRef *&curRef) const;
  VersionNode* HashFindGreaterOrEqualRef(const Slice& key, TreeVoidRef* &curRef) const;

  // Return the latest node with a key < key.
  // Return head_ if there is no such node.
  // Fills prev[level] with pointer to previous node at "level" for every
  // level in [0..max_height_-1], if prev is non-null.
  VersionNode *FindLessThan(const char *key,
                            VersionNode **prev = nullptr) const;

  // Return the last node in the list.
  // Return head_ if list is empty.
  VersionNode *FindLast() const;
  VersionNode *FindLastRef(TreeVoidRef *&curRef) const;

  // No copying allowed
  InlineUserKeyIndex(const InlineUserKeyIndex &);
  InlineUserKeyIndex &operator=(const InlineUserKeyIndex &);
};

template <class Comparator>
inline InlineUserKeyIndex<Comparator>::Iterator::Iterator(
    const InlineUserKeyIndex *list) {
  SetList(list);
}

template <class Comparator>
inline void InlineUserKeyIndex<Comparator>::Iterator::SetList(
    const InlineUserKeyIndex *list) {
  list_ = list;
  node_ = nullptr;
  artRef_ = nullptr;
}

template <class Comparator>
inline bool InlineUserKeyIndex<Comparator>::Iterator::Valid() const {
  return node_ != nullptr && artRef_ != nullptr;
}

template <class Comparator>
inline const char *InlineUserKeyIndex<Comparator>::Iterator::key() const {
  assert(Valid());
  return node_->Key();
}

template <class Comparator>
inline VersionNode *InlineUserKeyIndex<Comparator>::Iterator::node() {
  assert(Valid());
  return node_;
}

template <class Comparator>
inline void InlineUserKeyIndex<Comparator>::Iterator::Next() {
  assert(Valid());
  node_ = node_->Next();
  while (node_ == nullptr) {
    artRef_ = artRef_->Next();
    if (artRef_ == nullptr)
      return;
    node_ = artRef_->getContentList();
  }
}

template <class Comparator>
inline void InlineUserKeyIndex<Comparator>::Iterator::Prev() {
  assert(Valid());
  VersionNode *curNode = node_->Prev();
  node_ = curNode;
  while (node_ == nullptr) {
    artRef_ = artRef_->Prev();
    if (artRef_ == nullptr)
      return;
    curNode = artRef_->getContentList();
    while (curNode != nullptr) {
      node_ = curNode;
      curNode = curNode->Next();
    }
  }
}

template <class Comparator>
inline void InlineUserKeyIndex<Comparator>::Iterator::Seek(const char *target) {
  node_ = list_->FindGreaterOrEqualRef(target, artRef_);
}

template <class Comparator>
inline void InlineUserKeyIndex<Comparator>::Iterator::Seek(const Slice& key) {
  node_ = list_->FindGreaterOrEqualRef(key, artRef_);
}

template <class Comparator>
void InlineUserKeyIndex<Comparator>::Iterator::DelCurNode() {
  assert(artRef_ != nullptr);
  VersionNode *beDeleteNode = node_, *prev, *next;
  while (true) {
    bool findCurNode = false;
    prev = artRef_->getContentList();
    if ((void *)prev == (void *)beDeleteNode) {
      if (!artRef_->CASContentList(beDeleteNode, beDeleteNode->Next())) {
        continue;
      }
      break;
    }

    while (prev != nullptr) {
      next = prev->Next();
      if ((void *)next == (void *)beDeleteNode) {
        findCurNode = true;
        break;
      }
      prev = next;
    }
    if (!findCurNode) {
      std::cout << "ERROR. cannot find current node in content list."
                << std::endl;
      exit(1);
    }
    if (!prev->CASNext(beDeleteNode, beDeleteNode->Next())) {
      continue;
    }
    break;
  }

  rocksdb::DeleteWhileNoRefs::getInstance()->markNodeForDeletion(
      beDeleteNode, DELETION_TYPE_VERSION);
  this->Next();
}

template <class Comparator>
void *InlineUserKeyIndex<Comparator>::Iterator::PushCurNode() {
  assert(artRef_ != nullptr);
  VersionNode *beDeleteNode = node_, *prev, *next;
  while (true) {
    bool findCurNode = false;
    prev = artRef_->getContentList();
    if ((void *)prev == (void *)beDeleteNode) {
      if (!artRef_->CASContentList(beDeleteNode, beDeleteNode->Next())) {
        continue;
      }
      break;
    }

    while (prev != nullptr) {
      next = prev->Next();
      if ((void *)next == (void *)beDeleteNode) {
        findCurNode = true;
        break;
      }
      prev = next;
    }
    if (!findCurNode) {
      std::cout << "ERROR. cannot find current node in content list."
                << std::endl;
      exit(1);
    }
    next = beDeleteNode->Next();
    if (!prev->CASNext(beDeleteNode, next)) {
      continue;
    }
    if (next != nullptr)
      next->CASPrev(beDeleteNode, prev);
    break;
  }

  this->Next();
  return beDeleteNode;
}

template <class Comparator>
inline void
InlineUserKeyIndex<Comparator>::Iterator::SeekForPrev(const char *target) {
  Seek(target);
  if (!Valid()) {
    SeekToLast();
  }
  while (Valid() && list_->LessThan(target, key())) {
    Prev();
  }
}

template <class Comparator>
inline void InlineUserKeyIndex<Comparator>::Iterator::SeekToFirst() {
  artRef_ = this->list_->userKeyIndex_->head();
  if (artRef_ == nullptr) {
    node_ = nullptr;
    return;
  }
  node_ = artRef_->getContentList();
  while (node_ == nullptr) {
    artRef_ = artRef_->Next();
    if (artRef_ == nullptr)
      return;
    node_ = artRef_->getContentList();
  }
}

template <class Comparator>
inline void InlineUserKeyIndex<Comparator>::Iterator::SeekToLast() {
  node_ = list_->FindLastRef(artRef_);
}

template<class Comparator>
void InlineUserKeyIndex<Comparator>::Iterator::HashSeek(const char* target) {
  //std::unique_lock<std::mutex> lk(const_cast<InlineART*>(list_)->mtx_);
  node_ = list_->HashFindGreaterOrEqualRef(target, artRef_);
}

template<class Comparator>
void InlineUserKeyIndex<Comparator>::Iterator::HashSeek(const Slice& key) {
  //std::unique_lock<std::mutex> lk(const_cast<InlineART*>(list_)->mtx_);
  node_ = list_->HashFindGreaterOrEqualRef(key, artRef_);
}

template<class Comparator>
void InlineUserKeyIndex<Comparator>::Iterator::HashSeekForPrev(const char* target) {
  HashSeek(target);
  if (!Valid()) {
    SeekToLast();
  }
  while (Valid() && list_->LessThan(target, key())) {
    Prev();
  }
}

template<class Comparator>
void InlineUserKeyIndex<Comparator>::Iterator::HashSeekForPrev(const Slice& target) {
  HashSeek(target);
  if (!Valid()) {
    SeekToLast();
  }
  while (Valid() && !list_->LessThan(key(), target)) {
    Prev();
  }
}

template<class Comparator>
void InlineUserKeyIndex<Comparator>::Iterator::HashSeekForPrevOrEqual(const Slice& target) {
  HashSeek(target);
  if (!Valid()) {
    SeekToLast();
  }
  while (Valid() && !(list_->Equal(key(), target) || list_->LessThan(key(), target))) {
    Prev();
  }
}


template <class Comparator>
bool InlineUserKeyIndex<Comparator>::KeyIsAfterNode(const char *key,
                                                    VersionNode *n) const {
  // nullptr n is considered infinite
  assert(n != nullptr);
  return (n != nullptr) && (compare_(n->Key(), key) < 0);
}

template <class Comparator>
bool InlineUserKeyIndex<Comparator>::KeyIsAfterNode(const DecodedKey &key,
                                                    VersionNode *n) const {
  // nullptr n is considered infinite
  assert(n != nullptr);
  return (n != nullptr) && (compare_(n->Key(), key) < 0);
}

template <class Comparator>
VersionNode *InlineUserKeyIndex<Comparator>::FindGreaterOrEqualRef(
    const char *key, TreeVoidRef *&curRef) const {
  Slice buf = GetLengthPrefixedSlice(key);
  if (ART_PRINT_LOG) {
    std::cout << "-------InlineUserKeyIndex<Comparator>::FindGreaterOrEqual "
                 "begin. input buff: ";
    for (size_t i = 0; i < buf.size(); i++)
      std::cout << " " << (int)buf[i];
    std::cout << std::endl;
  }
  MvccKey mk;
  mk.parseKey(buf);
  if (ART_PRINT_LOG) {
    std::cout << "parse buff to key: ";
    mk.dump();
  }

  curRef = userKeyIndex_->getGETreeVoidRef(
      mk.userKey_); // 找到满足userkey比对关系的位置
  if (curRef == nullptr) {
    if (ART_PRINT_LOG)
      std::cout << "cannot find artRef for key:" << mk.userKey_.ToString()
                << std::endl;
    return nullptr;
  }

  VersionNode *retNode = curRef->getContentList();
  if (ART_PRINT_LOG) {
    Slice buf1 = GetLengthPrefixedSlice(retNode->Key());
    MvccKey mk1;
    mk1.parseKey(buf1);
    mk1.dump();
  }
  while (retNode == nullptr ||
         compare_(retNode->Key(), key) < 0) { // 比对hlc和seqnum部分的大小。
    if (ART_PRINT_LOG) {
      std::cout << "WARN:  compare_(retNode->Key(), key) failed ... retcode: "
                << std::endl;
      if (retNode == nullptr) {
        std::cout << "retnode is null." << std::endl;
      } else {
        Slice buf1 = GetLengthPrefixedSlice(retNode->Key());
        MvccKey mk1;
        mk1.parseKey(buf1);
        mk1.dump();
      }
    }

    if (retNode == nullptr) {
      curRef =
          curRef
              ->Next(); // hlc和seqnum都不匹配时，获取下一个 userkey 继续比对。
      if (curRef == nullptr)
        return nullptr;
      retNode = curRef->getContentList();
    } else {
      retNode = retNode->Next();
    }
  }

  return retNode;
}

template <class Comparator>
VersionNode *InlineUserKeyIndex<Comparator>::HashFindGreaterOrEqualRef(
        const char *key, TreeVoidRef *&curRef) const {
      Slice buf = GetLengthPrefixedSlice(key);
      if (ART_PRINT_LOG) {
        std::cout << "-------InlineUserKeyIndex<Comparator>::FindGreaterOrEqual "
                     "begin. input buff: ";
        for (size_t i = 0; i < buf.size(); i++)
          std::cout << " " << (int)buf[i];
        std::cout << std::endl;
      }
      MvccKey mk;
      mk.parseKey(buf);
      if (ART_PRINT_LOG) {
        std::cout << "parse buff to key: ";
        mk.dump();
      }

      auto uMapIter = hashMap_.find(mk.userKey_.ToString());
      if (uMapIter != hashMap_.end()) {
        curRef = uMapIter->second;
      } else {
        curRef = userKeyIndex_->getGETreeVoidRef(mk.userKey_); // 找到满足userkey比对关系的位置
      }
      if (curRef == nullptr) {
        if (ART_PRINT_LOG)
          std::cout << "cannot find artRef for key:" << mk.userKey_.ToString()
                    << std::endl;
        return nullptr;
      }

      VersionNode *retNode = curRef->getContentList();
      if (ART_PRINT_LOG) {
        Slice buf1 = GetLengthPrefixedSlice(retNode->Key());
        MvccKey mk1;
        mk1.parseKey(buf1);
        mk1.dump();
      }
      while (retNode == nullptr ||
             compare_(retNode->Key(), key) < 0) { // 比对hlc和seqnum部分的大小。
        if (ART_PRINT_LOG) {
          std::cout << "WARN:  compare_(retNode->Key(), key) failed ... retcode: "
                    << std::endl;
          if (retNode == nullptr) {
            std::cout << "retnode is null." << std::endl;
          } else {
            Slice buf1 = GetLengthPrefixedSlice(retNode->Key());
            MvccKey mk1;
            mk1.parseKey(buf1);
            mk1.dump();
          }
        }

        if (retNode == nullptr) {
          curRef =
                  curRef
                          ->Next(); // hlc和seqnum都不匹配时，获取下一个 userkey 继续比对。
          if (curRef == nullptr)
            return nullptr;
          retNode = curRef->getContentList();
        } else {
          retNode = retNode->Next();
        }
      }

      return retNode;
    }

template <class Comparator>
VersionNode *InlineUserKeyIndex<Comparator>::FindGreaterOrEqualRef(
    const Slice& key, TreeVoidRef *&curRef) const {
  if (ART_PRINT_LOG) {
    std::cout << "-------InlineUserKeyIndex<Comparator>::FindGreaterOrEqual "
                 "begin. input buff: ";
    for (size_t i = 0; i < key.size(); i++)
      std::cout << " " << (int)key[i];
    std::cout << std::endl;
  }
  MvccKey mk;
  mk.parseKey(key);
  if (ART_PRINT_LOG) {
    std::cout << "parse buff to key: ";
    mk.dump();
  }

  curRef = userKeyIndex_->getGETreeVoidRef(
      mk.userKey_); // 找到满足userkey比对关系的位置
  if (curRef == nullptr) {
    if (ART_PRINT_LOG)
      std::cout << "cannot find artRef for key:" << mk.userKey_.ToString()
                << std::endl;
    return nullptr;
  }

  VersionNode *retNode = curRef->getContentList();
  if (ART_PRINT_LOG) {
    Slice buf1 = GetLengthPrefixedSlice(retNode->Key());
    MvccKey mk1;
    mk1.parseKey(buf1);
    mk1.dump();
  }

  while (retNode == nullptr ||
         compare_(retNode->Key(), key) < 0) { // 比对hlc和seqnum部分的大小。
    if (ART_PRINT_LOG) {
      std::cout << "WARN:  compare_(retNode->Key(), key) failed ... retcode: "
                << std::endl;
      if (retNode == nullptr) {
        std::cout << "retnode is null." << std::endl;
      } else {
        Slice buf1 = GetLengthPrefixedSlice(retNode->Key());
        MvccKey mk1;
        mk1.parseKey(buf1);
        mk1.dump();
      }
    }

    if (retNode == nullptr) {
      curRef =
          curRef
              ->Next(); // hlc和seqnum都不匹配时，获取下一个 userkey 继续比对。
      if (curRef == nullptr)
        return nullptr;
      retNode = curRef->getContentList();
    } else {
      retNode = retNode->Next();
    }
  }

  return retNode;
}

template <class Comparator>
VersionNode* InlineUserKeyIndex<Comparator>::HashFindGreaterOrEqualRef(const Slice& key, TreeVoidRef* &curRef) const {
  if (ART_PRINT_LOG){
    std::cout << "-------InlineART<Comparator>::HashFindGreaterOrEqualRef begin. input buff: ";
    for(size_t i = 0 ; i < key.size(); i++) std::cout << " " << (int)key[i];
    std::cout << std::endl;
  }
  MvccKey mk;
  mk.parseKey(key);
  if (ART_PRINT_LOG){
    std::cout << "parse buff to key: ";
    mk.dump();
  }

  //先在Hash表里面找，如果没有找到，则在ART里面找。
  auto uMapIter = hashMap_.find(mk.userKey_.ToString());   ////Slice类型  ToString(true)---16进制
  if (uMapIter != hashMap_.end()) {
    curRef = uMapIter->second;
  } else {
    curRef = userKeyIndex_->getGETreeVoidRef(mk.userKey_); // 找到满足userkey比对关系的位置
  }
  if (curRef == nullptr){
    if (ART_PRINT_LOG)
      std::cout << "cannot find artRef for key:" << mk.userKey_.ToString() << std::endl;
    return nullptr;
  }


  VersionNode* retNode = curRef->getContentList();
  if (ART_PRINT_LOG)
  {
    Slice buf1 = GetLengthPrefixedSlice(retNode->Key());
    MvccKey mk1;
    mk1.parseKey(buf1);
    mk1.dump();
  }
  while(retNode == nullptr || compare_(retNode->Key(), key) < 0){ // 比对hlc和seqnum部分的大小。
    if (ART_PRINT_LOG){
      std::cout << "WARN:  compare_(retNode->Key(), key) failed ... retcode: " <<std::endl;
      if (retNode == nullptr){
        std::cout << "retnode is null." <<std::endl;
      }else{
        Slice buf1 = GetLengthPrefixedSlice(retNode->Key());
        MvccKey mk1;
        mk1.parseKey(buf1);
        mk1.dump();
      }
    }

    if (retNode == nullptr){
      curRef = curRef->Next(); // hlc和seqnum都
      // 不匹配时，获取下一个 userkey 继续比对。
      if (curRef == nullptr)
        return nullptr;
      retNode = curRef->getContentList();
    }else{
      retNode = retNode->Next();
    }
  }

  return retNode;
}

template <class Comparator>
VersionNode *
InlineUserKeyIndex<Comparator>::FindGreaterOrEqual(const char *key) const {
  TreeVoidRef *curRef = nullptr;
  return FindGreaterOrEqualRef(key, curRef);
}

template <class Comparator>
VersionNode *
InlineUserKeyIndex<Comparator>::FindLessThan(const char *key,
                                             VersionNode **prev) const {
  Slice buf = GetLengthPrefixedSlice(key);
  MvccKey mk;
  mk.parseKey(buf);

  TreeVoidRef *curRef = this->userKeyIndex_->getGETreeVoidRef(mk.userKey_);
  VersionNode *preNode = curRef->getContentList();
  VersionNode *nextNode = nullptr;
  while (true) {
    if (preNode == nullptr || compare_(preNode->Key(), key) >= 0) {
      curRef = curRef->Prev();
      if (curRef == nullptr)
        return nullptr;
      preNode = curRef->getContentList();
      continue;
    }
    nextNode = preNode->Next();
    if (nextNode == nullptr || compare_(nextNode->Key(), key) >= 0) {
      return preNode;
    }
    preNode = nextNode;
    nextNode = nextNode->Next();
  }
}

template <class Comparator>
VersionNode *
InlineUserKeyIndex<Comparator>::FindLastRef(TreeVoidRef *&curRef) const {
  curRef = this->userKeyIndex_->tail();
  if (curRef == nullptr) {
    return nullptr;
  }
  VersionNode *prevNode = nullptr;
  VersionNode *curNode = curRef->getContentList();
  while (true) {
    if (curNode == nullptr) {
      if (prevNode != nullptr)
        return prevNode;
      curRef = curRef->Prev();
      if (curRef == nullptr)
        return nullptr;
      curNode = curRef->getContentList();
      continue;
    }
    prevNode = curNode;
    curNode = curNode->Next();
  }
}
template <class Comparator>
VersionNode *InlineUserKeyIndex<Comparator>::FindLast() const {
  TreeVoidRef *curRef = nullptr;
  return FindLastRef(curRef);
}

template <class Comparator>
InlineUserKeyIndex<Comparator>::InlineUserKeyIndex(const Comparator cmp,
                                                   UserKeyIndexType type)
    : compare_(cmp) {
  userKeyIndex_ = new UserKeyIndex(type);
}

template <class Comparator>
char *InlineUserKeyIndex<Comparator>::AllocateKey(size_t key_size) {
  assert(false); // should not run here.
  return nullptr;
}

template <class Comparator>
bool InlineUserKeyIndex<Comparator>::InsertConcurrently(const char *key) {
  return this->Insert(key);
}

template <class Comparator>
bool InlineUserKeyIndex<Comparator>::InsertWithHint(const char *key,
                                                    void **hint) {
  return this->Insert(key);
}

template <class Comparator>
bool InlineUserKeyIndex<Comparator>::Insert(const char *key) {
  auto x =
      const_cast<VersionNode *>(reinterpret_cast<const VersionNode *>(key));
  x->SetNext(nullptr);
  x->SetPrev(nullptr);
  assert(x->Key());
  Slice buf = GetLengthPrefixedSlice(x->Key());
  assert(buf.size() > 0);
  MvccKey mk;
  mk.parseKey(buf);

  TreeVoidRef *curRef = this->userKeyIndex_->getTreeVoidRef(mk.userKey_, x);
  assert(curRef != nullptr);

  // Add hash index for userKey
  std::unique_lock<std::mutex> lk(mtx_);
  this->hashMap_.insert(std::pair<std::string, TreeVoidRef*> {mk.userKey_.ToString(), curRef});
  lk.unlock();

  while (true) { // CAS 无锁并发机制， 冲突重试机制
    VersionNode *preNode = curRef->getContentList();
    if (preNode == nullptr) { // 该 userkey 尚未有value时
      if (!curRef->CASContentList(nullptr, x))
        continue;
      else {
        return true;
      }
    }
    if ((void *)(preNode) ==
        (void *)(x)) { // 新建的artref，已经将node放入其中。
      return true;
    }
    int compp = compare_(preNode->Key(), x->Key());
    if (compp == 0) {
      // duplicate key
      return false;
    }
    if (compp >= 0) { // 链表按照 从小到大，当前key小，则排在最前面。
      x->SetNext(preNode);
      if (!curRef->CASContentList(preNode, x))
        continue;
      else {
        preNode->SetPrev(x);
        return true;
      }
    }

    VersionNode *nextNode = preNode->Next();
    while (nextNode != nullptr) { // 寻找 当前key对应的前后节点
      if ((void *)(nextNode) ==
          (void *)(x)) { // 可能存在 x插入后，其他线程将 node也插入到artref中。
        return true;
      }
      int cmp = compare_(nextNode->Key(), x->Key());
      if (cmp < 0) {
        preNode = nextNode;
        nextNode = nextNode->Next();
      } else if (cmp == 0) {
        // duplicate key
        return false;
      } else {
        break;
      }
    }

    x->SetNext(nextNode);
    x->SetPrev(preNode);

    if (!preNode->CASNext(nextNode, x)) {
      continue;
    }
    if (nextNode != nullptr)
      nextNode->CASPrev(preNode, x);
    return true;
  }
  return false;
}

template <class Comparator>
bool InlineUserKeyIndex<Comparator>::Contains(const char *key) const {
  VersionNode *x = FindGreaterOrEqual(key);
  if (x != nullptr && Equal(key, x->Key())) {
    return true;
  } else {
    return false;
  }
}

} // namespace rocksdb
